package com.bruce.tool.mq.aliyun.core;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.bruce.tool.common.exception.BaseRuntimeException;
import com.bruce.tool.common.util.LogUtils;
import com.bruce.tool.common.util.string.JsonUtils;
import com.bruce.tool.common.util.valid.ValidUtils;
import com.bruce.tool.mq.aliyun.config.AliMqConfig;
import com.bruce.tool.mq.aliyun.constant.AliMqCode;
import com.bruce.tool.mq.aliyun.constant.AliMqPool;
import com.bruce.tool.mq.aliyun.domain.AliMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * 功能 :
 * 发送者
 * @author : Bruce(刘正航) 11:02 2019-02-14
 */
@Slf4j
@Component
public class AliSender {

    @Autowired
    private AliMqPool pool;

    /**发送同步消息**/
    public void execute(AliMessage message){
        this.execute(message,null);
    }

    /**发送同步消息**/
    public void execute(AliMessage message, SendCallback callback){
        ValidUtils.valid(message);

        AliMqConfig config = pool.getConfigs().get(message.getCode());
        if(Objects.isNull(config) ){
            throw new BaseRuntimeException(AliMqCode.MQERROR_CONFIG_NULL.getCode(), AliMqCode.MQERROR_CONFIG_NULL.getMessage());
        }

        Producer producer = pool.getSenders().get(message.getCode());
        if(Objects.isNull(producer) ){
            throw new BaseRuntimeException(AliMqCode.MQERROR_SENDER_NULL.getCode(), AliMqCode.MQERROR_SENDER_NULL.getMessage());
        }

        Message msg = buildMessage(message, config);

        if( Objects.nonNull(callback) ){
            async(producer, msg, callback);
            return;
        }
        sync(producer, msg);
    }

    /**同步执行**/
    private void sync(Producer producer, Message msg) {
        try {
            SendResult sendResult = producer.send(msg);
            // 同步发送消息，只要不抛异常就是成功
            LogUtils.debug(log, JsonUtils.objToStr(sendResult));
        } catch (Exception e) {
            throw new BaseRuntimeException(AliMqCode.MQERROR_SEND_EXCEPTION.getCode(), AliMqCode.MQERROR_SEND_EXCEPTION.getMessage());
        }
    }

    /**异步执行**/
    private void async(Producer producer, Message msg, SendCallback callback) {
        producer.sendAsync(msg, callback);
    }

    /**构建消息**/
    private Message buildMessage(AliMessage message, AliMqConfig config) {
        Message msg = new Message(
                // Message 所属的 Topic
                config.getTopic(),
                // Message Tag 可理解为 Gmail 中的标签，对消息进行再归类，方便 Consumer 指定过滤条件在 MQ 服务器过滤
                config.getTags(),
                // Message AliMessage 可以是任何二进制形式的数据， MQ 不做任何干预，
                // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                message.getBody());
        // 设置代表消息的业务关键属性，请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下，可通过阿里云服务器管理控制台查询消息并补发
        // 注意：不设置也不会影响消息正常收发
        if(StringUtils.isNotBlank(message.getKey())){
            msg.setKey(message.getKey());
        }
        return msg;
    }
}
